// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task_scheduler/scheduler_worker.h"

#include <stddef.h>

#include <memory>
#include <vector>

#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/synchronization/condition_variable.h"
#include "base/task_scheduler/scheduler_lock.h"
#include "base/task_scheduler/sequence.h"
#include "base/task_scheduler/task.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace base {
namespace internal {
    namespace {

        const size_t kNumSequencesPerTest = 150;

        // The test parameter is the number of Tasks per Sequence returned by GetWork().
        class TaskSchedulerWorkerTest : public testing::TestWithParam<size_t> {
        protected:
            TaskSchedulerWorkerTest()
                : main_entry_called_(WaitableEvent::ResetPolicy::MANUAL,
                    WaitableEvent::InitialState::NOT_SIGNALED)
                , num_get_work_cv_(lock_.CreateConditionVariable())
                , worker_set_(WaitableEvent::ResetPolicy::MANUAL,
                      WaitableEvent::InitialState::NOT_SIGNALED)
            {
            }

            void SetUp() override
            {
                worker_ = SchedulerWorker::Create(
                    ThreadPriority::NORMAL,
                    WrapUnique(new TestSchedulerWorkerDelegate(this)),
                    &task_tracker_,
                    SchedulerWorker::InitialState::ALIVE);
                ASSERT_TRUE(worker_);
                worker_set_.Signal();
                main_entry_called_.Wait();
            }

            void TearDown() override
            {
                worker_->JoinForTesting();
            }

            size_t TasksPerSequence() const { return GetParam(); }

            // Wait until GetWork() has been called |num_get_work| times.
            void WaitForNumGetWork(size_t num_get_work)
            {
                AutoSchedulerLock auto_lock(lock_);
                while (num_get_work_ < num_get_work)
                    num_get_work_cv_->Wait();
            }

            void SetMaxGetWork(size_t max_get_work)
            {
                AutoSchedulerLock auto_lock(lock_);
                max_get_work_ = max_get_work;
            }

            void SetNumSequencesToCreate(size_t num_sequences_to_create)
            {
                AutoSchedulerLock auto_lock(lock_);
                EXPECT_EQ(0U, num_sequences_to_create_);
                num_sequences_to_create_ = num_sequences_to_create;
            }

            size_t NumRunTasks()
            {
                AutoSchedulerLock auto_lock(lock_);
                return num_run_tasks_;
            }

            std::vector<scoped_refptr<Sequence>> CreatedSequences()
            {
                AutoSchedulerLock auto_lock(lock_);
                return created_sequences_;
            }

            std::vector<scoped_refptr<Sequence>> EnqueuedSequences()
            {
                AutoSchedulerLock auto_lock(lock_);
                return re_enqueued_sequences_;
            }

            std::unique_ptr<SchedulerWorker> worker_;

        private:
            class TestSchedulerWorkerDelegate
                : public SchedulerWorker::Delegate {
            public:
                TestSchedulerWorkerDelegate(TaskSchedulerWorkerTest* outer)
                    : outer_(outer)
                {
                }

                // SchedulerWorker::Delegate:
                void OnMainEntry(SchedulerWorker* worker) override
                {
                    outer_->worker_set_.Wait();
                    EXPECT_EQ(outer_->worker_.get(), worker);

                    // Without synchronization, OnMainEntry() could be called twice without
                    // generating an error.
                    AutoSchedulerLock auto_lock(outer_->lock_);
                    EXPECT_FALSE(outer_->main_entry_called_.IsSignaled());
                    outer_->main_entry_called_.Signal();
                }

                scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override
                {
                    EXPECT_EQ(outer_->worker_.get(), worker);

                    {
                        AutoSchedulerLock auto_lock(outer_->lock_);

                        // Increment the number of times that this method has been called.
                        ++outer_->num_get_work_;
                        outer_->num_get_work_cv_->Signal();

                        // Verify that this method isn't called more times than expected.
                        EXPECT_LE(outer_->num_get_work_, outer_->max_get_work_);

                        // Check if a Sequence should be returned.
                        if (outer_->num_sequences_to_create_ == 0)
                            return nullptr;
                        --outer_->num_sequences_to_create_;
                    }

                    // Create a Sequence with TasksPerSequence() Tasks.
                    scoped_refptr<Sequence> sequence(new Sequence);
                    for (size_t i = 0; i < outer_->TasksPerSequence(); ++i) {
                        std::unique_ptr<Task> task(new Task(
                            FROM_HERE, Bind(&TaskSchedulerWorkerTest::RunTaskCallback, Unretained(outer_)),
                            TaskTraits(), TimeDelta()));
                        EXPECT_TRUE(outer_->task_tracker_.WillPostTask(task.get()));
                        sequence->PushTask(std::move(task));
                    }

                    {
                        // Add the Sequence to the vector of created Sequences.
                        AutoSchedulerLock auto_lock(outer_->lock_);
                        outer_->created_sequences_.push_back(sequence);
                    }

                    return sequence;
                }

                // This override verifies that |sequence| contains the expected number of
                // Tasks and adds it to |enqueued_sequences_|. Unlike a normal
                // EnqueueSequence implementation, it doesn't reinsert |sequence| into a
                // queue for further execution.
                void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override
                {
                    EXPECT_GT(outer_->TasksPerSequence(), 1U);

                    // Verify that |sequence| contains TasksPerSequence() - 1 Tasks.
                    for (size_t i = 0; i < outer_->TasksPerSequence() - 1; ++i) {
                        EXPECT_TRUE(sequence->PeekTask());
                        sequence->PopTask();
                    }
                    EXPECT_FALSE(sequence->PeekTask());

                    // Add |sequence| to |re_enqueued_sequences_|.
                    AutoSchedulerLock auto_lock(outer_->lock_);
                    outer_->re_enqueued_sequences_.push_back(std::move(sequence));
                    EXPECT_LE(outer_->re_enqueued_sequences_.size(),
                        outer_->created_sequences_.size());
                }

                TimeDelta GetSleepTimeout() override
                {
                    return TimeDelta::Max();
                }

                bool CanDetach(SchedulerWorker* worker) override
                {
                    return false;
                }

            private:
                TaskSchedulerWorkerTest* outer_;
            };

            void RunTaskCallback()
            {
                AutoSchedulerLock auto_lock(lock_);
                ++num_run_tasks_;
                EXPECT_LE(num_run_tasks_, created_sequences_.size());
            }

            TaskTracker task_tracker_;

            // Synchronizes access to all members below.
            mutable SchedulerLock lock_;

            // Signaled once OnMainEntry() has been called.
            WaitableEvent main_entry_called_;

            // Number of Sequences that should be created by GetWork(). When this
            // is 0, GetWork() returns nullptr.
            size_t num_sequences_to_create_ = 0;

            // Number of times that GetWork() has been called.
            size_t num_get_work_ = 0;

            // Maximum number of times that GetWork() can be called.
            size_t max_get_work_ = 0;

            // Condition variable signaled when |num_get_work_| is incremented.
            std::unique_ptr<ConditionVariable> num_get_work_cv_;

            // Sequences created by GetWork().
            std::vector<scoped_refptr<Sequence>> created_sequences_;

            // Sequences passed to EnqueueSequence().
            std::vector<scoped_refptr<Sequence>> re_enqueued_sequences_;

            // Number of times that RunTaskCallback() has been called.
            size_t num_run_tasks_ = 0;

            // Signaled after |worker_| is set.
            WaitableEvent worker_set_;

            DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerTest);
        };

        // Verify that when GetWork() continuously returns Sequences, all Tasks in these
        // Sequences run successfully. The test wakes up the SchedulerWorker once.
        TEST_P(TaskSchedulerWorkerTest, ContinuousWork)
        {
            // Set GetWork() to return |kNumSequencesPerTest| Sequences before starting to
            // return nullptr.
            SetNumSequencesToCreate(kNumSequencesPerTest);

            // Expect |kNumSequencesPerTest| calls to GetWork() in which it returns a
            // Sequence and one call in which its returns nullptr.
            const size_t kExpectedNumGetWork = kNumSequencesPerTest + 1;
            SetMaxGetWork(kExpectedNumGetWork);

            // Wake up |worker_| and wait until GetWork() has been invoked the
            // expected amount of times.
            worker_->WakeUp();
            WaitForNumGetWork(kExpectedNumGetWork);

            // All tasks should have run.
            EXPECT_EQ(kNumSequencesPerTest, NumRunTasks());

            // If Sequences returned by GetWork() contain more than one Task, they aren't
            // empty after the worker pops Tasks from them and thus should be returned to
            // EnqueueSequence().
            if (TasksPerSequence() > 1)
                EXPECT_EQ(CreatedSequences(), EnqueuedSequences());
            else
                EXPECT_TRUE(EnqueuedSequences().empty());
        }

        // Verify that when GetWork() alternates between returning a Sequence and
        // returning nullptr, all Tasks in the returned Sequences run successfully. The
        // test wakes up the SchedulerWorker once for each Sequence.
        TEST_P(TaskSchedulerWorkerTest, IntermittentWork)
        {
            for (size_t i = 0; i < kNumSequencesPerTest; ++i) {
                // Set GetWork() to return 1 Sequence before starting to return
                // nullptr.
                SetNumSequencesToCreate(1);

                // Expect |i + 1| calls to GetWork() in which it returns a Sequence and
                // |i + 1| calls in which it returns nullptr.
                const size_t expected_num_get_work = 2 * (i + 1);
                SetMaxGetWork(expected_num_get_work);

                // Wake up |worker_| and wait until GetWork() has been invoked
                // the expected amount of times.
                worker_->WakeUp();
                WaitForNumGetWork(expected_num_get_work);

                // The Task should have run
                EXPECT_EQ(i + 1, NumRunTasks());

                // If Sequences returned by GetWork() contain more than one Task, they
                // aren't empty after the worker pops Tasks from them and thus should be
                // returned to EnqueueSequence().
                if (TasksPerSequence() > 1)
                    EXPECT_EQ(CreatedSequences(), EnqueuedSequences());
                else
                    EXPECT_TRUE(EnqueuedSequences().empty());
            }
        }

        INSTANTIATE_TEST_CASE_P(OneTaskPerSequence,
            TaskSchedulerWorkerTest,
            ::testing::Values(1));
        INSTANTIATE_TEST_CASE_P(TwoTasksPerSequence,
            TaskSchedulerWorkerTest,
            ::testing::Values(2));

        namespace {

            class ControllableDetachDelegate : public SchedulerWorker::Delegate {
            public:
                ControllableDetachDelegate()
                    : work_processed_(WaitableEvent::ResetPolicy::MANUAL,
                        WaitableEvent::InitialState::NOT_SIGNALED)
                    , detach_requested_(WaitableEvent::ResetPolicy::MANUAL,
                          WaitableEvent::InitialState::NOT_SIGNALED)
                {
                }

                ~ControllableDetachDelegate() override = default;

                // SchedulerWorker::Delegate:
                void OnMainEntry(SchedulerWorker* worker) override { }

                scoped_refptr<Sequence> GetWork(SchedulerWorker* worker)
                    override
                {
                    // Sends one item of work to signal |work_processed_|. On subsequent calls,
                    // sends nullptr to indicate there's no more work to be done.
                    if (work_requested_)
                        return nullptr;

                    work_requested_ = true;
                    scoped_refptr<Sequence> sequence(new Sequence);
                    std::unique_ptr<Task> task(new Task(
                        FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&work_processed_)),
                        TaskTraits(), TimeDelta()));
                    sequence->PushTask(std::move(task));
                    return sequence;
                }

                void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override
                {
                    ADD_FAILURE() << "GetWork() returns a sequence of one, so there's nothing to reenqueue.";
                }

                TimeDelta GetSleepTimeout() override
                {
                    return TimeDelta::Max();
                }

                bool CanDetach(SchedulerWorker* worker) override
                {
                    detach_requested_.Signal();
                    return can_detach_;
                }

                void WaitForWorkToRun()
                {
                    work_processed_.Wait();
                }

                void WaitForDetachRequest()
                {
                    detach_requested_.Wait();
                }

                void ResetState()
                {
                    work_requested_ = false;
                    work_processed_.Reset();
                    detach_requested_.Reset();
                }

                void set_can_detach(bool can_detach) { can_detach_ = can_detach; }

            private:
                bool work_requested_ = false;
                bool can_detach_ = false;
                WaitableEvent work_processed_;
                WaitableEvent detach_requested_;

                DISALLOW_COPY_AND_ASSIGN(ControllableDetachDelegate);
            };

        } // namespace

        TEST(TaskSchedulerWorkerTest, WorkerDetaches)
        {
            TaskTracker task_tracker;
            // Will be owned by SchedulerWorker.
            ControllableDetachDelegate* delegate = new ControllableDetachDelegate;
            delegate->set_can_detach(true);
            std::unique_ptr<SchedulerWorker> worker = SchedulerWorker::Create(
                ThreadPriority::NORMAL, WrapUnique(delegate), &task_tracker,
                SchedulerWorker::InitialState::ALIVE);
            worker->WakeUp();
            delegate->WaitForWorkToRun();
            delegate->WaitForDetachRequest();
            // Sleep to give a chance for the detach to happen. A yield is too short.
            PlatformThread::Sleep(TimeDelta::FromMilliseconds(50));
            ASSERT_FALSE(worker->ThreadAliveForTesting());
        }

        TEST(TaskSchedulerWorkerTest, WorkerDetachesAndWakes)
        {
            TaskTracker task_tracker;
            // Will be owned by SchedulerWorker.
            ControllableDetachDelegate* delegate = new ControllableDetachDelegate;
            delegate->set_can_detach(true);
            std::unique_ptr<SchedulerWorker> worker = SchedulerWorker::Create(
                ThreadPriority::NORMAL, WrapUnique(delegate), &task_tracker,
                SchedulerWorker::InitialState::ALIVE);
            worker->WakeUp();
            delegate->WaitForWorkToRun();
            delegate->WaitForDetachRequest();
            // Sleep to give a chance for the detach to happen. A yield is too short.
            PlatformThread::Sleep(TimeDelta::FromMilliseconds(50));
            ASSERT_FALSE(worker->ThreadAliveForTesting());

            delegate->ResetState();
            delegate->set_can_detach(false);
            worker->WakeUp();
            delegate->WaitForWorkToRun();
            delegate->WaitForDetachRequest();
            PlatformThread::Sleep(TimeDelta::FromMilliseconds(50));
            ASSERT_TRUE(worker->ThreadAliveForTesting());
            worker->JoinForTesting();
        }

        TEST(TaskSchedulerWorkerTest, CreateDetached)
        {
            TaskTracker task_tracker;
            // Will be owned by SchedulerWorker.
            ControllableDetachDelegate* delegate = new ControllableDetachDelegate;
            std::unique_ptr<SchedulerWorker> worker = SchedulerWorker::Create(
                ThreadPriority::NORMAL, WrapUnique(delegate), &task_tracker,
                SchedulerWorker::InitialState::DETACHED);
            ASSERT_FALSE(worker->ThreadAliveForTesting());
            worker->WakeUp();
            delegate->WaitForWorkToRun();
            delegate->WaitForDetachRequest();
            ASSERT_TRUE(worker->ThreadAliveForTesting());
            worker->JoinForTesting();
        }

    } // namespace
} // namespace internal
} // namespace base
